package cn.jdemo.guava.disruptor;

import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;

import java.nio.ByteBuffer;
import java.util.concurrent.Executors;

/**
 * Disruptor它是一个高性能的异步处理的开源并发框架，
 * 能够在无锁的情况下实现网络的Queue并发操作。
 * 可以认为是最快的消息框架（轻量的JMS），也可以认为是一个观察者模式的实现，或者事件监听模式的实现。
 *
 * RingBuffer：环形队列，是Disruptor最为重要的组件，其作用是存储和更新Disruptor中流通的数据。
 * Sequence：递增序号（AtomicLong），Disruptor使用Sequence标识一个特殊组件处理的序号。每个重要的组件基本都有一个Sequence。
 * Producer：生产者，泛指通过Disruptor发布事件的用户代码（实际业务代码，而并发框架代码）生成Event数据。
 * Event：事件，从生产者到消费者过程中的数据单元。由用户定义代码。
 * EventHandler：消费者，代表Disruptor框架中的一个消费者接口，由用户实现代码，负责处理Event数据，进度通过Sequence控制。
 *
 * Sequecer：Disruptor框架真正的核心，在生产者和消费者直接进行高效准确快速的数据传输。通过复杂的算法去协调生存者和消费者之间的关系。
 * SequenceBarrier：Sequecer具体的实施者，字面理解是序号屏障，其目的是决定消费者 消费Evnet的逻辑。（生产者发布事件快于消费，生产者等待。消费速度大于生产者发布事件速度，消费者监听）
 * EventProcessor：可以理解为具体的消费线程，最后把结果返回给EventHandler。
 * WaitStrategy：当消费者等待在SequenceBarrier上时，有许多可选的等待策略
 *      BusySpinWaitStrategy ： 自旋等待，类似Linux Kernel使用的自旋锁。低延迟但同时对CPU资源的占用也多。
 *      BlockingWaitStrategy ： 使用锁和条件变量。CPU资源的占用少，延迟大。
 *      SleepingWaitStrategy ： 在多次循环尝试不成功后，选择让出CPU，等待下次调，多次调度后仍不成功，尝试前睡眠一个纳秒级别的时间再尝试。这种策略平衡了延迟和CPU资源占用，但延迟不均匀。
 *      YieldingWaitStrategy ： 在多次循环尝试不成功后，选择让出CPU，等待下次调。平衡了延迟和CPU资源占用，但延迟也比较均匀。
 *      PhasedBackoffWaitStrategy ： 上面多种策略的综合，CPU资源的占用少，延迟大。
 */
public class LongEventMain {
    public static void main(String[] args) throws Exception {

        //1. 创建disruptor
        Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>( // (1)
                new LongEventFactory(),             // 事件工厂
                1024 * 1024,                        // RingBuffer 大小，必须是2的N次方
                Executors.defaultThreadFactory(),   // 线程池
                ProducerType.SINGLE,                // 单个生产者，如果有多个生产者必须使用 ProducerType.MULTI
                new YieldingWaitStrategy()          // 生产者和消费者的平衡策略
        );

        //2. 连接消费事件方法
        disruptor.handleEventsWith(new LongEventHandler());

        //3. 启动
        disruptor.start();

        //4. 发布事件
        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();

        //LongEventProducer producer = new LongEventProducer(ringBuffer);
        LongEventProducerWithTranslator producer = new LongEventProducerWithTranslator(ringBuffer);

        ByteBuffer byteBuffer = ByteBuffer.allocate(8);
        for(long l = 0; l<100; l++){
            byteBuffer.putLong(0, l);
            producer.onData(byteBuffer);
            //Thread.sleep(1000);
        }

        disruptor.shutdown();//关闭 disruptor，方法会堵塞，直至所有的事件都得到处理；
    }
}
